flink |
您所在的位置:网站首页 › flink 状态定时清理 › flink |
import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.{CheckpointingMode } import org.apache.flink.streaming.api.scala._ object ProcessFunctionTest { def main(args: Array[String]): Unit = { // 这个是流的检查点 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //每隔多久Checkpointing 60000ms env.enableCheckpointing(60000) // 特别注意 如果是测试的话,就1000ms测试看看效果,省的等待 //状态一致性的级别至少一次 默认是精确一次,后面的章节会具体讲解 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) //Checkpoint保存到状态后端 如果因网络延迟等问题的话,超时100000ms=100s 就不做Checkpoint env.getCheckpointConfig.setCheckpointTimeout(100000) //true 表示如果在做Checkpointing的时候出现了失败,就把整个job给取消掉,false就是不取消 env.getCheckpointConfig.setFailOnCheckpointingErrors(false) /* 设置Checkpoints的最大有几个可以并行,当然和setMinPauseBetweenCheckpoints(..)一起设置有点冲突。 注意 Checkpointing可能有先后 但是一个远行Checkpointing可能会好久,所以 很可能会出现某个时间段的并行 */ env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //设置每隔多久Checkpoints,当然和setMaxConcurrentCheckpoints(..)一起设置有点冲突。 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(100) /* DELETE_ON_CANCELLATION表示手动取消的话job,就会删除Checkpoint。 RETAIN_ON_CANCELLATION表示手动取消job人物的话,就不删除Checkpoint。 如果是自动的job任务是失败,Checkpoint是默认保存的 */ env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //如果job失败了,当重启的话,最大3次可以重启,每次重启间隔300ms env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,300) ) //如果job失败了,当重启的话,300s内最大3次可以重启,且每次的间隔为10s,这里的时间单位和收上面的不一样 env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(300), org.apache.flink.api.common.time.Time.seconds(10))) env.execute("test") } } |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |